#include "config.h"

#include <stdbool.h>
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>
#include <list.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"

#include "token_bucket.h"
#include "list.h"

#include "lsv.h"
#include "lsv_conf.h"
#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

/** @file note
 *
 * - 任务要及时退出，避免lock或wait过多
 */

typedef enum {
        PIPELINE_QUEUE_LOG    = 1,
        PIPELINE_QUEUE_BITMAP = 2,
} pipeline_queue_t;

static void __print_pipeline(pipeline_t *pipeline, pipeline_queue_t queue_type) {
        uint32_t ca[PIPELINE_CHUNK_MAX];

        for (int i=0; i < PIPELINE_CHUNK_MAX; i++) {
                ca[i] = 0;
        }

        struct list_head *pos, *n;
        pipeline_chunk_t *entry;

        list_for_each_safe(pos, n, &pipeline->log_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);
                ca[entry->status] += 1;
        }

        list_for_each_safe(pos, n, &pipeline->bitmap_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);
                ca[entry->status] += 1;
        }

        uint32_t total = 0;
        for (int i=0; i < PIPELINE_CHUNK_MAX; i++) {
                total += ca[i];
        }
        YASSERT(total == pipeline->log_queue.count + pipeline->bitmap_queue.count);

        DERROR("stage %d total %llu task %llu %llu con %llu left %u %u (%u %u %u %u %u)\n",
               queue_type,
               (LLU)pipeline->total_count,
               (LLU)pipeline->log_count,
               (LLU)pipeline->bitmap_count,
               (LLU)pipeline->log_enter_count,
               pipeline->log_queue.count,
               pipeline->bitmap_queue.count,
               ca[0], ca[1], ca[2], ca[3], ca[4]);
}

static int __check_log_chunk_rule(pipeline_t *pipeline, uint32_t chunk_id) {
        if (pipeline->last_log_chunk_id != 0) {
                if (pipeline->last_log_chunk_id == WBUF_CHUNK_NUM) {
                        YASSERT(chunk_id == 1);
                } else {
                        YASSERT(chunk_id == pipeline->last_log_chunk_id + 1);
                }
        }

        pipeline->last_log_chunk_id = chunk_id;

        return 0;
}

static inline int __check_bitmap_chunk_rule(pipeline_t *pipeline, uint32_t chunk_id) {
        if (pipeline->last_bitmap_chunk_id != 0) {
                if (pipeline->last_bitmap_chunk_id == WBUF_CHUNK_NUM) {
                        YASSERT(chunk_id == 1);
                } else {
                        YASSERT(chunk_id == pipeline->last_bitmap_chunk_id + 1);
                }
        }

        pipeline->last_bitmap_chunk_id = chunk_id;

        return 0;
}

static inline void __move_log2bitmap(pipeline_t *pipeline) {
        struct list_head *pos, *n;
        pipeline_chunk_t *entry;

        list_for_each_safe(pos, n, &pipeline->log_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);

                // 保证进入序
                if (entry->status == PIPELINE_CHUNK_POST_LOG) {
                        count_list_del_init(pos, &pipeline->log_queue);
                        count_list_add_tail(pos, &pipeline->bitmap_queue);

                        time_count_update(&pipeline->lsv_info->tc, COUNT_BITMAP_IN, 1);
                } else {
                        break;
                }
        }
}

/** 有可重入性问题，故引入本地队列，同时为保证bitmap持久化任务的顺序，需控制任务de唯一性.
 *
 * @param arg pipeline
 */
static void __do_stage_bitmap(void *arg) {
        int ret;
        pipeline_t *pipeline = arg;

        if (pipeline->stage_bitmap_running) {
                DINFO("stage bitmapt running %u, quit\n", pipeline->bitmap_queue.count)
                return;
        }

        struct list_head *pos, *n;
        pipeline_chunk_t *entry;

        count_list_t local_queue;
        count_list_init(&local_queue);

        // shared queue, no yield
        list_for_each_safe(pos, n, &pipeline->bitmap_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);
                if (entry->status == PIPELINE_CHUNK_POST_LOG) {
                        count_list_del_init(pos, &pipeline->bitmap_queue);
                        count_list_add_tail(pos, &local_queue);
                } else {
                        break;
                }
        }

        if (local_queue.count == 0) {
                return;
        }

        // 保证只有一个task进入该区域
        // 应尽快处理bitmap，且最好聚合更多的bitmap，两者达到一平衡态
        pipeline->stage_bitmap_running = TRUE;

        // local queue, yield, but no shared status
        DERROR("bitmap task %u\n", local_queue.count);

        int total = local_queue.count;
        int count = 0;

        // TODO maybe too many items, scheduler timeout
        list_for_each_safe(pos, n, &local_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);
                if (entry->status == PIPELINE_CHUNK_POST_LOG) {
                        // can bitmap, sort and split
                        entry->status = PIPELINE_CHUNK_PRE_BITMAP;

                        pipeline->lsv_info->commit_bytes += LSV_PAGE_SIZE * (LSV_WBUF_PAGE_NUM - 1);
                        count ++;

                        __check_bitmap_chunk_rule(pipeline, entry->chunk->id);

                        DINFO("%u/%u stage bitmap %p status %d chunk_id %u chunk %d page_idx %u\n",
                              count, total, entry,
                              entry->status,
                              entry->chunk_id,
                              entry->chunk->id,
                              entry->chunk->page_idx);

                        if (lsv_feature & LSV_FEATURE_BITMAP) {
                                // TODO high priority
                                ret = update_bitmap(pipeline->lsv_info, entry->chunk_id, &entry->log_proto);
                                if (unlikely(ret)) {
                                        YASSERT(0);
                                }
                        }

                        YASSERT(entry->chunk->page_idx == entry->chunk->fill_count);

                        chunk_post_commit(pipeline->lsv_info, entry->chunk, 0);

                        // TODO GC
                        ret = lsv_gc_add(pipeline->lsv_info, &entry->log_proto);
                        if (unlikely(ret))
                                YASSERT(0);

                        entry->status = PIPELINE_CHUNK_POST_BITMAP;
                        pipeline->bitmap_count++;

                        // OK
                        count_list_del_init(pos, &local_queue);
                        yfree((void **)&entry);

                        co_cond_broadcast(&pipeline->too_many_log, 0);

                        // stat
                        lsv_wbuf_t *wbuf = pipeline->lsv_info->wbuf;
                        lsv_wbuf_qos_update(&wbuf->qos, 0, 255);

                        time_count_update(&pipeline->lsv_info->tc, COUNT_BITMAP_OUT, 1);
                } else {
                        YASSERT(0);
                }
        }

        __print_pipeline(pipeline, PIPELINE_QUEUE_BITMAP);

#if 0
        // TODO when check fill, deadlock
        if (!list_empty_careful(&pipeline->log_queue.list)) {
                schedule_task_new("stage_log", __do_stage_log, pipeline, -1);
        }
#endif

        // 自我驱动
        if (!list_empty_careful(&pipeline->bitmap_queue.list)) {
                schedule_task_new("stage_bitmap", __do_stage_bitmap, pipeline, -1);
        }

        pipeline->stage_bitmap_running = FALSE;
}

static void __do_log(void *arg) {
        int ret;
        pipeline_chunk_t *entry = arg;
        pipeline_t *pipeline = entry->pipeline;
        uint32_t chunk_id;

        pipeline->log_enter_count++;

        YASSERT(entry->status == PIPELINE_CHUNK_PRE_LOG);
        {
                pipeline->lsv_info->log_write_count++;
                pipeline->lsv_info->log_bytes += LSV_PAGE_SIZE * (LSV_WBUF_PAGE_NUM - 1);

                // update 4k head crc
                entry->log_proto.head->crc = page_crc32(entry->log_proto.log);

                ret = lsv_volume_chunk_write(entry->pipeline->lsv_info,
                                             entry->log_proto.log,
                                             LSV_LOG_LOG_STORAGE_TYPE,
                                             &chunk_id);
                if (unlikely(ret)) {
                        // TODO No space
                        YASSERT(0);
                }

                entry->chunk_id = chunk_id;
                entry->log_proto.chunk_id = chunk_id;
        }
        entry->status = PIPELINE_CHUNK_POST_LOG;

        __move_log2bitmap(pipeline);

        time_count_update(&pipeline->lsv_info->tc, COUNT_LOG_OUT, 1);

        pipeline->log_count++;
        pipeline->log_enter_count--;

        __print_pipeline(pipeline, PIPELINE_QUEUE_LOG);

        schedule_task_new("stage_bitmap", __do_stage_bitmap, pipeline, -1);
}

/**
 * @note log阶段，不能改变pipeline的元素顺序和多少
 *
 * @param arg
 */
static void __do_stage_log(void *arg) {
        pipeline_t *pipeline = arg;

        __move_log2bitmap(pipeline);

#if 0
        // 限制log任务数, 包括已提交和完成两部分
        while (TRUE) {
                if (pipeline->bitmap_queue.count < WBUF_CHUNK_NUM / 2) {
                        break;
                }

                // 只有第一个任务wait，别的直接返回，防止占用task
                // 为了防止log task没有得到处理，需要补偿机制
                if (pipeline->is_too_many_log) {
                        return;
                }

                pipeline->is_too_many_log = TRUE;
                co_cond_wait2(&pipeline->too_many_log, "wait_bitmap");
                pipeline->is_too_many_log = FALSE;
        }
#endif

        // YASSERT(pipeline->bitmap_queue.count < WBUF_CHUNK_NUM / 2);

        // count_list_t local_queue;
        // count_list_init(&local_queue);

        // @note: 一个entry不能同时加入两个list，需要通过另一个hook字段

        struct list_head *pos, *n;
        pipeline_chunk_t *entry;

        // 保证并行度
        list_for_each_safe(pos, n, &pipeline->log_queue.list) {
                entry = list_entry(pos, pipeline_chunk_t, hook);
                if (entry->status == PIPELINE_CHUNK_INIT && chunk_is_fill(entry->chunk)) {
                        // 防止重复处理
                        entry->status = PIPELINE_CHUNK_PRE_LOG;
                        schedule_task_new("do_log", __do_log, entry, -1);
                }
        }
}

int pipeline_init(pipeline_t *pipeline, lsv_volume_proto_t *lsv_info) {
        count_list_init(&pipeline->log_queue);
        count_list_init(&pipeline->bitmap_queue);

        pipeline->lsv_info = lsv_info;
        pipeline->stage_bitmap_running = FALSE;

        pipeline->is_too_many_log = FALSE;
        co_cond_init(&pipeline->too_many_log);

        pipeline->log_count = 0;
        pipeline->bitmap_count = 0;

        pipeline->log_enter_count = 0;

        pipeline->last_log_chunk_id = 0;
        pipeline->last_bitmap_chunk_id = 0;

        return 0;
}

int pipeline_add_tail(pipeline_t *pipeline, lsv_chunk_buf_t *chunk) {
        int ret;
        pipeline_chunk_t *entry;

        time_count_update(&pipeline->lsv_info->tc, COUNT_LOG_IN, 1);

        ret = ymalloc((void **)&entry, sizeof(pipeline_chunk_t));
        if (unlikely(ret)) {
                YASSERT(0);
        }

        entry->pipeline = pipeline;
        entry->chunk = chunk;

        // struct的开始处的数据
        entry->log_proto.log = chunk;
        lsv_log_proto_link(&entry->log_proto);

        entry->status = PIPELINE_CHUNK_INIT;
        entry->chunk_id = LSV_CHUNK_NULL_ID;

        YASSERT(entry->log_proto.head->page_count<=LSV_LOG_PAGE_NUM);

        // 进入队列，保证顺序
        count_list_add_tail(&entry->hook, &pipeline->log_queue);

        pipeline->total_count++;

        // TODO
        schedule_task_new("stage_log", __do_stage_log, pipeline, -1);

        DINFO("chunk %d page_count %u entry %p depth %u %u\n", chunk->id,
              entry->log_proto.head->page_count,
              entry,
              pipeline->log_queue.count,
              pipeline->bitmap_queue.count);

//#if DEBUG_MODE
        __check_log_chunk_rule(pipeline, chunk->id);
//#endif

        return 0;
}

inline int pipeline_do_log(pipeline_t *pipeline) {
        if (!list_empty_careful(&pipeline->log_queue.list)) {
                schedule_task_new("stage_log", __do_stage_log, pipeline, -1);
        }

        return 0;
}
